package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
import com.atguigu.gmall.realtime.beans.OrderDetail;
import com.atguigu.gmall.realtime.beans.OrderInfo;
import com.atguigu.gmall.realtime.beans.OrderWide;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * Author: Felix
 * Date: 2021/9/18
 * Desc: 订单宽表
 */
public class OrderWideApp {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        /*
        //TODO 2.设置检查点
        //2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //2.2 设置检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        //2.3 设置job取消之后  检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("xxxx"));
        //2.6 设置操作hdfs的用户
        System.setProperty("HADOOP_USER_NAME","atguigu");
        */
        //TODO 3.从Kafka中读取数据
        //3.1 声明消费的主题以及消费者组
        String orderInfoTopic = "dwd_order_info";
        String orderDetailTopic = "dwd_order_detail";
        String groupId = "order_wide_app_group";

        //3.2 获取消费订单消费者对象
        FlinkKafkaConsumer<String> orderInfoKafkaSource = MyKafkaUtil.getKafkaSourceFunction(orderInfoTopic, groupId);

        //3.3 获取消费订单明细消费者对象
        FlinkKafkaConsumer<String> orderDetailKafkaSource = MyKafkaUtil.getKafkaSourceFunction(orderDetailTopic, groupId);

        //3.4 消费订单 封装为订单流
        DataStreamSource<String> orderInfoStrDS = env.addSource(orderInfoKafkaSource);

        //3.5 消费订单明细 封装为订单明细流
        DataStreamSource<String> orderDetailStrDS = env.addSource(orderDetailKafkaSource);

        //TODO 4.对流中的类型进行转换     jsonStr->实体类对象
        //4.1 订单
        SingleOutputStreamOperator<OrderInfo> orderInfoDS = orderInfoStrDS.map(
            new RichMapFunction<String, OrderInfo>() {
                private SimpleDateFormat sdf;
                @Override
                public void open(Configuration parameters) throws Exception {
                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                }

                @Override
                public OrderInfo map(String jsonStr) throws Exception {
                    OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
                    String createTime = orderInfo.getCreate_time();
                    orderInfo.setCreate_ts(sdf.parse(createTime).getTime());
                    return orderInfo;
                }
            }
        );
        //4.2 订单明细
        SingleOutputStreamOperator<OrderDetail> orderDetailDS = orderDetailStrDS.map(
            new RichMapFunction<String, OrderDetail>() {
                private SimpleDateFormat sdf;

                @Override
                public void open(Configuration parameters) throws Exception {
                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                }

                @Override
                public OrderDetail map(String jsonStr) throws Exception {
                    OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
                    String createTime = orderDetail.getCreate_time();
                    orderDetail.setCreate_ts(sdf.parse(createTime).getTime());
                    return orderDetail;
                }
            }
        );

        //orderInfoDS.print(">>>");
        //orderDetailDS.print("####");

        //TODO 5.指定Watermark以及提取事件时间字段
        //5.1 订单
        SingleOutputStreamOperator<OrderInfo> orderInfoWithWatermarkDS = orderInfoDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<OrderInfo>() {
                        @Override
                        public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                            return orderInfo.getCreate_ts();
                        }
                    }
                )

        );
        //5.2 订单明细
        SingleOutputStreamOperator<OrderDetail> orderDetailWithWatermarkDS = orderDetailDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<OrderDetail>() {
                        @Override
                        public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
                            return orderDetail.getCreate_ts();
                        }
                    }
                )
        );
        //TODO 6.分组 指定关联条件
        //6.1 订单
        KeyedStream<OrderInfo, Long> orderInfoKeyedDS = orderInfoWithWatermarkDS.keyBy(OrderInfo::getId);

        //6.2 订单明细
        KeyedStream<OrderDetail, Long> orderDetailKeyedDS = orderDetailWithWatermarkDS.keyBy(OrderDetail::getOrder_id);

        //TODO 7.使用intervalJoin完成订单和订单明细的双流join
        SingleOutputStreamOperator<OrderWide> joinedDS = orderInfoKeyedDS
            .intervalJoin(orderDetailKeyedDS)
            .between(Time.seconds(-5), Time.seconds(5))
            .process(
                new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(orderInfo, orderDetail));
                    }
                }
            );

        //joinedDS.print(">>>>>");

        //TODO 8.和用户维度进行关联
        //将异步操作应用到流上
        SingleOutputStreamOperator<OrderWide> orderWideWithUserInfoDS = AsyncDataStream.unorderedWait(
            joinedDS,
            new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
                @Override
                public void join(OrderWide orderWide, JSONObject userDimJsonObj) throws Exception {

                    orderWide.setUser_gender(userDimJsonObj.getString("GENDER"));

                    String birthdayStr = userDimJsonObj.getString("BIRTHDAY");
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                    long birthdayTime = sdf.parse(birthdayStr).getTime();
                    long currentTime = System.currentTimeMillis();
                    Long ageL = (currentTime - birthdayTime) / 1000L / 60L / 60L / 24L / 365L;
                    orderWide.setUser_age(ageL.intValue());

                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return orderWide.getUser_id().toString();
                }
            },
            60,
            TimeUnit.SECONDS
        );

        //orderWideWithUserInfoDS.print(">>>>>");

        //TODO 9.和地区维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(
            orderWideWithUserInfoDS,
            new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
                @Override
                public void join(OrderWide orderWide, JSONObject dimInfo) throws Exception {
                    orderWide.setProvince_name(dimInfo.getString("NAME"));
                    orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
                    orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
                    orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return orderWide.getProvince_id().toString();
                }
            },
            60, TimeUnit.SECONDS
        );
        //orderWideWithProvinceDS.print(">>>>");

        //TODO 9.和SKU维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
            orderWideWithProvinceDS,
            new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
                    orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
                    orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
                    orderWide.setTm_id(jsonObject.getLong("TM_ID"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getSku_id());
                }
            }, 60, TimeUnit.SECONDS);

        //TODO 10.和SPU维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
            orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getSpu_id());
                }
            }, 60, TimeUnit.SECONDS);

        //TODO 11.和品类维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
            orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setCategory3_name(jsonObject.getString("NAME"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getCategory3_id());
                }
            }, 60, TimeUnit.SECONDS);

        //TODO 12.和品牌维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
            orderWideWithCategory3DS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setTm_name(jsonObject.getString("TM_NAME"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getTm_id());
                }
            }, 60, TimeUnit.SECONDS);


        orderWideWithTmDS.print(">>>>>>");

        //TODO 13.将订单宽表数据写回到kafka的dwm层
        orderWideWithTmDS
            .map(JSON::toJSONString)
            .addSink(MyKafkaUtil.getKafkaSinkFunction("dwm_order_wide"));

        env.execute();
    }
}
